package com.h2cloud.JUCTest.patten;


import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

@Slf4j(topic = "c.TestProducerConsumer")
public class TestProducerConsumer {
	public static void main(String[] args) {
		MessageQueue messageQueue = new MessageQueue(2);
		for (int i = 0; i < 4; i++) {
			int id = i;
			new Thread(() -> {
				try {
					log.debug("download...");
					List<String> response = Downloader.download();
					log.debug("try put message({})", id);
					messageQueue.put(new Message(id, response));
				} catch (IOException e) {
					e.printStackTrace();
				}
			}, "生产者" + i).start();
		}

		new Thread(() -> {
			while (true) {
				Message message = messageQueue.take();
				List<String> response = (List<String>) message.getMessage();
				log.debug("take message({}): [{}] lines", message.getId(), response.size());
			}

		}, "消费者").start();
	}
}

@Data
@AllArgsConstructor
class Message {
	private int id;
	private Object message;
}

@Slf4j(topic = "c.MessageQueue")
class MessageQueue {
	private LinkedList<Message> queue;
	private int capacity;

	public MessageQueue(int capacity) {
		this.capacity = capacity;
		queue = new LinkedList<>();
	}

	public Message take() {
		synchronized (queue) {
			while (queue.isEmpty()) {
				log.debug("没货了, wait");
				try {
					queue.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			Message message = queue.removeFirst();
			queue.notifyAll();
			return message;
		}
	}

	public void put(Message message) {
		synchronized (queue) {
			while (queue.size() == capacity) {
				log.debug("库存已达上限, wait");
				try {
					queue.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			queue.addLast(message);
			queue.notifyAll();
		}
	}
}
